package com.gvsoft.communication.client;

import com.gvsoft.communication.client.cbufferpool.CReadBufferPool;
import com.gvsoft.communication.client.gather.ConcurrentGather;
import com.gvsoft.communication.client.model.inf.IClientOrder;
import com.gvsoft.communication.client.model.inf.IServerPacket;
import com.gvsoft.communication.client.model.order.ClientKeepOrder;
import com.gvsoft.communication.client.model.order.ClientMsgOrder;
import com.gvsoft.communication.client.net.KeepChannelThread;
import com.gvsoft.communication.client.net.ReconnectThread;
import com.gvsoft.communication.client.net.ServerPacketHandleThread;
import com.gvsoft.communication.client.net.adapter.IOAdapter;
import com.gvsoft.communication.client.net.handle.inf.IServerPacketHandle;
import com.gvsoft.communication.console.ControlListener;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.*;

/**
 * Created with IntelliJ IDEA.
 * ProjectName:gvMsgClient
 * Author: zhaoqiubo
 * Date: 15/9/9
 * Time: 下午4:19
 * Desc: 客户端通讯主类
 */
public class Client implements Closeable {

    private final static Logger logger = LogManager.getLogger("client");
    /**
     * 当前client配置类
     */
    private ClientConfig cfg;
    /**
     * 当前client所使用的通道
     */
    private SocketChannel channel;
    /**
     * 当前client启动的selector
     */
    private Selector selector;

    /**
     * 读取处理线程池
     */
    private ExecutorService readHandleExecutor;
    /**
     * 当前client的selector线程
     */
    private ClientListener listener;
    /**
     * IO适配器,将网络包处理成类
     */
    private IOAdapter ioAdapter;
    /**
     * 记录写入的时候的断包位置(目前我认为与使用何种通讯协议无关)
     */
    private int writePos = 0;
    /**
     * 维持链路线程
     */
    private KeepChannelThread keepChannelThread;
    /**
     * 短线重连线程
     */
    private ReconnectThread reconnectThread;
    /**
     * 存储写入包的队列
     */
    public LinkedBlockingQueue<ByteBuffer> writeQ = new LinkedBlockingQueue<ByteBuffer>();
    /**
     * nio事件key
     */
    private SelectionKey key;
    /**
     * 客户端状态枚举
     */
    private ClientStatus status;
    /**
     * 状态实现接口
     */
    private IStatusHandle statusHandle = new DefaultStatusHandle();
    /**
     * 服务端分配的令牌
     */
    private String token;

    /**
     * 客户端身份标识,就是client_id
     */
    private String userId;
    /**
     * 标记客户端登录的请求流水号，如果此请求流水号有相应，证明服务端已经处理完登录请求；
     */
    private String lrid = "";
    /**
     * 存储发送出去的维持链路报文
     */
    private Map<String, ClientKeepOrder> keepOrderMap = new ConcurrentHashMap<String, ClientKeepOrder>();
    /**
     * 管理控制监听器
     */
    private static ControlListener controlListener;
    /**
     * 保存多个client实例的map
     */
    private static Map<String,Client> clientMap = new ConcurrentHashMap<String,Client>();

    /**
     * client状态枚举定义
     */
    public enum ClientStatus {
        ACTIVE, CLOSED, RECONNECT, NEW;
    }

    /**
     * 标记配置是否加载完成
     */
    private boolean isCfgFinished;

    public Client() {

    }

    public void setUserId(String userId) {
        this.userId = userId;
    }

    /**
     * 构造时读取配置文件，这样可以让后续继承开发者，
     * 创建了Client之后，通过ClientConfig直接获取配置参数
     *
     * @param file
     */
    public Client(String file) {
        this.cfg = new ClientConfig(file);

        if (initConfig()) {
            isCfgFinished = true;
        }
        this.userId = cfg.getProperty("client_id");
        if (StringUtils.isEmpty(userId)){
            throw new RuntimeException("客户端标识为空,必须指定client_id");
        }
        clientMap.put(this.getUserId(),this);
    }


    private void setKey(SelectionKey key) {
        this.key = key;
    }

    public boolean isActive() {
        return this.status == ClientStatus.ACTIVE;
    }

    public boolean isClosed() {
        return this.status == ClientStatus.CLOSED;
    }

    public boolean isReconnect() {
        return this.status == ClientStatus.RECONNECT;
    }

    public void setActive() {
        this.status = ClientStatus.ACTIVE;
        statusHandle.afterActived();
    }

    public boolean isNew() {
        return this.status == ClientStatus.NEW;

    }

    public void setClosed() {
        this.status = ClientStatus.CLOSED;
        statusHandle.afterClosed();
    }

    public void setReconnect() {
        this.status = ClientStatus.RECONNECT;
        statusHandle.afterReconnect();
    }

    public void setNew() {
        this.status = ClientStatus.NEW;
        statusHandle.afterNew();
    }

    public ClientStatus getClientStatus() {
        return this.status;
    }

    public ClientConfig getCfg() {
        return cfg;
    }

    /**
     * 如果配置文件已经在构造函数中制定，则直接调用此方法，不必再次传入配置文件路径
     *
     * @param userId
     */
    public void start(String userId) {
        this.userId = userId;
        regIOAdapter();
        this.setClosed();
        //启动链路维护线程
        keepChannelThread = (new KeepChannelThread(this, this.getUserId()+"-keepThread"));
        keepChannelThread.start();
        //自动重连线程
        reconnectThread = (new ReconnectThread(this,this.getUserId()+"-reconnectThread"));
        reconnectThread.start();
        readHandleExecutor = new ThreadPoolExecutor(cfg.getReadHandleThreadCount()
                , cfg.getReadHandleThreadCount(), 1000, TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(cfg.getReadHandleThreadCount() * 30));
        connectServer();
        addShutdownHook();
    }

    /**
     * 注册io适配器,io适配器主要完成对网络层数据包进行适配处理
     */
    private void regIOAdapter() {
        Class c = null;
        Object adapterClass = null;
        try {
            c = Class.forName(cfg.getProperty("ioadapter"));
            adapterClass = c.newInstance();

        } catch (Exception e) {
            throw new RuntimeException("找不到IO适配器，请检查相关配置！");
        }
        this.ioAdapter = (IOAdapter) adapterClass;//指定解析通讯报文的适配器
    }

    /**
     * 启动读取配置，连接服务器，以及各个线程。
     */
    public void start(String userId, String cfgFile) {
        if (!isCfgFinished) {
            this.cfg = new ClientConfig(cfgFile);
            initConfig();
        }
        this.start(userId);
    }

    private void addShutdownHook() {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            public void run() {
                close();
                logger.info("client已关闭!");
            }
        });
    }

    /**
     * 连接服务端,里面通过死循环while手工阻塞连接过程.如果一直没有连接成功则不会触发对读事件的处理.也不会触发重连
     */
    public void connectServer() {

        try {

            InetSocketAddress serverAddress = new InetSocketAddress(cfg.getServerIp(), cfg.getServerPort());
            channel = SocketChannel.open();
            //讲通道设置为非阻塞模式
            channel.configureBlocking(false);
            //打开多路复用器
            selector = Selector.open();

            key = channel.register(selector, SelectionKey.OP_CONNECT, this);
            if (listener != null) {
                listener.setInterrupt();
                listener.interrupt();
            }
            if (this.isReconnect()) {
                listener.setSelector(selector);
            }
            listener = new ClientListener(this, selector);
            listener.start();
            channel.connect(serverAddress);
            this.setNew();
            while (!channel.finishConnect()) {

                //wait, or do something else...
            }
            this.setKey(channel.register(selector, SelectionKey.OP_READ, this));
            clearKeepOrder();

            logger.info(this.getUserId() + "连接通道为:" + channel);
        } catch (IOException e) {
            logger.info("无法连接服务端!服务端ip:" + cfg.getServerIp() + "，port：" + cfg.getServerPort() + "，请检查服务端状态或客户端参数配置。" + e.getMessage());
            releaseAll();
            setReconnect();

        }

    }

    private boolean initConfig() {
        return cfg.initConfig();

    }

    public String getToken() {
        return token;
    }

    public void setToken(String token) {
        this.token = token;
    }

    public String getUserId() {
        return userId;
    }

    public SocketChannel getChannel() {
        return channel;
    }

    private void execReadHandle(Runnable runnable) {
        readHandleExecutor.submit(runnable);
    }

    /**
     * 写入一个order到服务端
     * @param order 实现了IClientOrder,并需要符合通讯协议.
     */
    public void write(IClientOrder order) {
        //创建一个byteBuffer用来存储要写入的buffer
//        ByteBuffer byteBuffer = CWriteBufferPool.getBuffer();//不使用写buffer池，当压力足够大的时候，会有线程安全问题。
        try {
            ByteBuffer byteBuffer = ByteBuffer.allocate(cfg.getWriteBlock());
            ioAdapter.analyWrite(order, byteBuffer);
            this.writeQ.offer(byteBuffer);
//        CWriteBufferPool.freeBuffer(byteBuffer);
            if(key.isValid()) {
                this.key.interestOps(SelectionKey.OP_WRITE);
                this.key.selector().wakeup();
            }
        } catch (Exception e) {
            logger.error("error:"+e.getMessage());
        }

    }

    /**
     * 发送一个消息对象
     * @param info 实现了BaseInfoInterface接口的类对象,该对象会被整理以json字符串的方式序列化,然后传送到服务端
     */
    public void sendInfo(BaseInfoInterface info){
        if(null != null) {
            ClientMsgOrder msgOrder = new ClientMsgOrder(getToken(), info);
            write(msgOrder);
        }
    }

    /**
     * 释放通道
     */
    public void releaseAll() {
        try {
            if (this.key != null) {
                if (this.key != null) {
                    this.key.cancel();
                }
                if (null != this.channel) {
                    this.channel.close();
                }
                if (this.selector != null) {
                    this.selector.close();
                    this.selector = null;
                }
            }

        } catch (IOException e) {
            logger.info(e.getMessage());
        }
    }

    /**
     * 中断所有正在执行的线程,并释放通道.
     */
    public void close() {
        reconnectThread.setInterrupt();
        keepChannelThread.setInterrupt();
        if (controlListener.isAlive()) {
            controlListener.interrupt();
        }
        listener.setInterrupt();
        readHandleExecutor.shutdownNow();
        releaseAll();
    }

    /**
     * 把通讯包对象放入线程池,异步处理.
     */
    public void asyncHandlePacket(IServerPacket packet) {
        ConcurrentGather.increasingRead();
        execReadHandle(new ServerPacketHandleThread(packet, key));


    }

    /**
     * 处理读事件
     *
     * @throws IOException
     */
    protected void doRead() throws IOException {
        //获取事件key中的channel
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer byteBuffer = CReadBufferPool.getBuffer();
//        ByteBuffer byteBuffer = ByteBuffer.allocate(ClientConfig.getReadBlock());
        //清理缓冲区，便于使用
        byteBuffer.clear();
        //将channel中的字节流读入缓冲区
        try {
            int count = channel.read(byteBuffer);
            //务必要把buffer的position重置为0
            byteBuffer.flip();
            if (count > 0) {
                ioAdapter.analyRead(this, byteBuffer);
                CReadBufferPool.freeBuffer(byteBuffer);
            } else if (count < 0) {
                releaseAll();
                setReconnect();
                logger.info(getUserId() + "读取失败，重连开始……");
            }
        } catch (IOException e) {
            throw e;
        }
    }

    /**
     * 处理写事件
     */
    protected void doWrite() {
        SocketChannel channel = (SocketChannel) key.channel();
        synchronized (writeQ) {
            while (true) {
                ByteBuffer byteBuffer = writeQ.peek();
                if (byteBuffer == null) {
                    key.interestOps(SelectionKey.OP_READ);
                    key.selector().wakeup();
                    break;
                }
                if (byteBuffer != null) {
                    try {
                        //如果上次写入的是断包，则将byteBuffer移动到上次写入的位置，
                        if (writePos > 0) {
                            byteBuffer.position(writePos);
                        }
                        ConcurrentGather.increasingWrite();
                        channel.write(byteBuffer);
                        //如果buffer没有全部被写入，那么记录写入位置后，中断循环，等待下次OP_WRITE
                        if (byteBuffer.remaining() > 0) {
                            writePos = byteBuffer.position();
                            break;
                        } else {
                            writePos = 0;
                            writeQ.remove();
                            key.interestOps(SelectionKey.OP_READ);
                            key.selector().wakeup();
                        }
                    } catch (Exception e) {
                        if (!channel.isOpen()) {
                            writeQ.remove();
                            releaseAll();
                            setReconnect();
                        }
                        logger.error("write failure ! error:" + e.getMessage());
                        e.printStackTrace();
                    }
                }
            }

        }
    }

    /**
     * 注册一个消息处理器,针对某一个类型的消息进行处理.cmd参数不能为T,L,K,R.也不能重复.
     *
     * @param cmd 处理器标识,例如:M
     * @param handle 处理器实现类对象
     */
    public void registerHandle(String cmd, IServerPacketHandle handle) {
        cfg.PACKET_HANDLE_CLASS_MAP.put(cmd, handle);
    }

    /**
     * 注册一个状态处理器;
     *
     * @param statusHandle
     */
    public void regClientStatusHandle(IStatusHandle statusHandle) {
        this.statusHandle = statusHandle;
    }

    public String getLrid() {
        return lrid;
    }

    public void setLrid(String lrid) {
        this.lrid = lrid;
    }
    public void addKeepOrder(ClientKeepOrder order) {
        this.keepOrderMap.put(order.getRid(), order);
    }
    public void clearKeepOrder() {
        this.keepOrderMap.clear();
    }
    public int getKOrderMapSize() {
        return this.keepOrderMap.size();
    }
    public Map getOrderMap() {
        return this.keepOrderMap;
    }
    public static void main(String[] arg) {

        Integer clientCount = 5;
        Client[] client = new Client[clientCount];

        for (int i = 0; i < clientCount; i++) {
            client[i] = new Client();
            client[i].start("3user[" + i + "]", System.getProperty("user.dir") + "/target/classes/client_cfg.properties");
        }
        Client.regConsole(System.getProperty("user.dir") + "/target/classes/common_client_cfg.properties");
        try {
            Thread.sleep(1);
        } catch (InterruptedException e) {
            logger.error(e.getMessage());
//            e.printStackTrace();
        }
    }

    public static Map<String,Client> getClientMap(){
        return clientMap;
    }

    public static boolean regConsole(String commonCfgFile){
        boolean flag = ClientConfig.initCommonCfg(commonCfgFile);
        //启动控制管理监听器
        controlListener = new ControlListener("ControlListener", ClientConfig.getConsolePort());
        controlListener.start();
        return flag;
    }


}
